/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to you under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.calcite.plan.volcano;

import org.apache.calcite.plan.RelOptListener;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.plan.RelOptRuleOperandChildPolicy;
import org.apache.calcite.rel.RelNode;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
 * <code>VolcanoRuleCall</code> implements the {@link RelOptRuleCall} interface
 * for VolcanoPlanner.
 */
public class VolcanoRuleCall extends RelOptRuleCall {
    //~ Instance fields --------------------------------------------------------

    protected final VolcanoPlanner volcanoPlanner;

    /**
     * List of {@link RelNode} generated by this call. For debugging purposes.
     */
    private List<RelNode> generatedRelList;

    //~ Constructors -----------------------------------------------------------

    /**
     * Creates a rule call, internal, with array to hold bindings.
     *
     * @param planner Planner
     * @param operand First operand of the rule
     * @param rels    Array which will hold the matched relational expressions
     * @param nodeInputs For each node which matched with {@code matchAnyChildren}
     *                   = true, a list of the node's inputs
     */
    protected VolcanoRuleCall(
            VolcanoPlanner planner,
            RelOptRuleOperand operand,
            RelNode[] rels,
            Map<RelNode, List<RelNode>> nodeInputs) {
        super(planner, operand, rels, nodeInputs);
        this.volcanoPlanner = planner;
    }

    /**
     * Creates a rule call.
     *
     * @param planner Planner
     * @param operand First operand of the rule
     */
    VolcanoRuleCall(
            VolcanoPlanner planner,
            RelOptRuleOperand operand) {
        this(
                planner,
                operand,
                new RelNode[operand.getRule().operands.size()],
                ImmutableMap.of());
    }

    //~ Methods ----------------------------------------------------------------

    // implement RelOptRuleCall
    public void transformTo(RelNode rel, Map<RelNode, RelNode> equiv) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Transform to: rel#{} via {}{}", rel.getId(), getRule(),
                    equiv.isEmpty() ? "" : " with equivalences " + equiv);
            if (generatedRelList != null) {
                generatedRelList.add(rel);
            }
        }
        try {
            // It's possible that rel is a subset or is already registered.
            // Is there still a point in continuing? Yes, because we might
            // discover that two sets of expressions are actually equivalent.

            if (LOGGER.isTraceEnabled()) {
                // Cannot call RelNode.toString() yet, because rel has not
                // been registered. For now, let's make up something similar.
                String relDesc =
                        "rel#" + rel.getId() + ":" + rel.getRelTypeName();
                LOGGER.trace("call#{}: Rule {} arguments {} created {}",
                        id, getRule(), Arrays.toString(rels), relDesc);
            }

            if (volcanoPlanner.listener != null) {
                RelOptListener.RuleProductionEvent event =
                        new RelOptListener.RuleProductionEvent(
                                volcanoPlanner,
                                rel,
                                this,
                                true);
                volcanoPlanner.listener.ruleProductionSucceeded(event);
            }

            // Registering the root relational expression implicitly registers
            // its descendants. Register any explicit equivalences first, so we
            // don't register twice and cause churn.
            for (Map.Entry<RelNode, RelNode> entry : equiv.entrySet()) {
                volcanoPlanner.ensureRegistered(
                        entry.getKey(), entry.getValue(), this);
            }
            volcanoPlanner.ensureRegistered(rel, rels[0], this);
            rels[0].getCluster().invalidateMetadataQuery();

            if (volcanoPlanner.listener != null) {
                RelOptListener.RuleProductionEvent event =
                        new RelOptListener.RuleProductionEvent(
                                volcanoPlanner,
                                rel,
                                this,
                                false);
                volcanoPlanner.listener.ruleProductionSucceeded(event);
            }
        } catch (Exception e) {
            throw new RuntimeException("Error occurred while applying rule "
                    + getRule(), e);
        }
    }

    /**
     * Called when all operands have matched.
     */
    protected void onMatch() {
        assert getRule().matches(this);
        volcanoPlanner.checkCancel();
        try {
            if (volcanoPlanner.isRuleExcluded(getRule())) {
                LOGGER.debug("Rule [{}] not fired due to exclusion filter", getRule());
                return;
            }

            for (int i = 0; i < rels.length; i++) {
                RelNode rel = rels[i];
                RelSubset subset = volcanoPlanner.getSubset(rel);

                if (subset == null) {
                    LOGGER.debug(
                            "Rule [{}] not fired because operand #{} ({}) has no subset",
                            getRule(), i, rel);
                    return;
                }

                if (subset.set.equivalentSet != null) {
                    LOGGER.debug(
                            "Rule [{}] not fired because operand #{} ({}) belongs to obsolete set",
                            getRule(), i, rel);
                    return;
                }

                final Double importance =
                        volcanoPlanner.relImportances.get(rel);
                if ((importance != null) && (importance == 0d)) {
                    LOGGER.debug("Rule [{}] not fired because operand #{} ({}) has importance=0",
                            getRule(), i, rel);
                    return;
                }
            }

            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(
                        "call#{}: Apply rule [{}] to {}",
                        id, getRule(), Arrays.toString(rels));
            }

            if (volcanoPlanner.listener != null) {
                RelOptListener.RuleAttemptedEvent event =
                        new RelOptListener.RuleAttemptedEvent(
                                volcanoPlanner,
                                rels[0],
                                this,
                                true);
                volcanoPlanner.listener.ruleAttempted(event);
            }

            volcanoPlanner.ruleCallStack.push(this);
            try {
                getRule().onMatch(this);
            } finally {
                volcanoPlanner.ruleCallStack.pop();
            }

            if (volcanoPlanner.listener != null) {
                RelOptListener.RuleAttemptedEvent event =
                        new RelOptListener.RuleAttemptedEvent(
                                volcanoPlanner,
                                rels[0],
                                this,
                                false);
                volcanoPlanner.listener.ruleAttempted(event);
            }
        } catch (Exception e) {
            throw new RuntimeException("Error while applying rule " + getRule()
                    + ", args " + Arrays.toString(rels), e);
        }
    }

    /**
     * Applies this rule, with a given relational expression in the first slot.
     */
    void match(RelNode rel) {
        assert getOperand0().matches(rel) : "precondition";
        final int solve = 0;
        int operandOrdinal = getOperand0().solveOrder[solve];
        this.rels[operandOrdinal] = rel;
        matchRecurse(solve + 1);
    }

    /**
     * Recursively matches operands above a given solve order.
     *
     * @param solve Solve order of operand (&gt; 0 and &le; the operand count)
     */
    private void matchRecurse(int solve) {
        assert solve > 0;
        assert solve <= rule.operands.size();
        final List<RelOptRuleOperand> operands = getRule().operands;
        if (solve == operands.size()) {
            // We have matched all operands. Now ask the rule whether it
            // matches; this gives the rule chance to apply side-conditions.
            // If the side-conditions are satisfied, we have a match.
            if (getRule().matches(this)) {
                onMatch();
            }
        } else {
            final int operandOrdinal = operand0.solveOrder[solve];
            final int previousOperandOrdinal = operand0.solveOrder[solve - 1];
            boolean ascending = operandOrdinal < previousOperandOrdinal;
            final RelOptRuleOperand previousOperand =
                    operands.get(previousOperandOrdinal);
            final RelOptRuleOperand operand = operands.get(operandOrdinal);
            final RelNode previous = rels[previousOperandOrdinal];

            final RelOptRuleOperand parentOperand;
            final Collection<? extends RelNode> successors;
            if (ascending) {
                assert previousOperand.getParent() == operand;
                parentOperand = operand;
                final RelSubset subset = volcanoPlanner.getSubset(previous);
                successors = subset.getParentRels();
            } else {
                parentOperand = previousOperand;
                final int parentOrdinal = operand.getParent().ordinalInRule;
                final RelNode parentRel = rels[parentOrdinal];
                final List<RelNode> inputs = parentRel.getInputs();
                // if the child is unordered, then add all rels in all input subsets to the successors list
                // because unordered can match child in any ordinal
                if (parentOperand.childPolicy == RelOptRuleOperandChildPolicy.UNORDERED) {
                    if (operand.getMatchedClass() == RelSubset.class) {
                        successors = inputs;
                    } else {
                        List<RelNode> allRelsInAllSubsets = new ArrayList<>();
                        Set<RelNode> duplicates = new HashSet<>();
                        for (RelNode input : inputs) {
                            if (!duplicates.add(input)) {
                                // Ignore duplicate subsets
                                continue;
                            }
                            RelSubset inputSubset = (RelSubset) input;
                            for (RelNode rel : inputSubset.getRels()) {
                                if (!duplicates.add(rel)) {
                                    // Ignore duplicate relations
                                    continue;
                                }
                                allRelsInAllSubsets.add(rel);
                            }
                        }
                        successors = allRelsInAllSubsets;
                    }
                } else if (operand.ordinalInParent < inputs.size()) {
                    // child policy is not unordered
                    // we need to find the exact input node based on child operand's ordinalInParent
                    final RelSubset subset =
                            (RelSubset) inputs.get(operand.ordinalInParent);
                    if (operand.getMatchedClass() == RelSubset.class) {
                        // If the rule wants the whole subset, we just provide it
                        successors = ImmutableList.of(subset);
                    } else {
                        successors = subset.getRelList();
                    }
                } else {
                    // The operand expects parentRel to have a certain number
                    // of inputs and it does not.
                    successors = ImmutableList.of();
                }
            }

            for (RelNode rel : successors) {
                if (!operand.matches(rel)) {
                    continue;
                }
                if (ascending && operand.childPolicy != RelOptRuleOperandChildPolicy.UNORDERED) {
                    // We know that the previous operand was *a* child of its parent,
                    // but now check that it is the *correct* child.
                    if (previousOperand.ordinalInParent >= rel.getInputs().size()) {
                        continue;
                    }
                    final RelSubset input =
                            (RelSubset) rel.getInput(previousOperand.ordinalInParent);
                    List<RelNode> inputRels = input.getRelList();
                    if (!inputRels.contains(previous)) {
                        continue;
                    }
                }

                // Assign "childRels" if the operand is UNORDERED.
                switch (parentOperand.childPolicy) {
                    case UNORDERED:
                        // Note: below is ill-defined. Suppose there's a union with 3 inputs,
                        // and the rule is written as Union.class, unordered(...)
                        // What should be provided for the rest 2 arguments?
                        // RelSubsets? Random relations from those subsets?
                        // For now, Calcite code does not use getChildRels, so the bug is just waiting its day
                        if (ascending) {
                            final List<RelNode> inputs = Lists.newArrayList(rel.getInputs());
                            inputs.set(previousOperand.ordinalInParent, previous);
                            setChildRels(rel, inputs);
                        } else {
                            List<RelNode> inputs = getChildRels(previous);
                            if (inputs == null) {
                                inputs = Lists.newArrayList(previous.getInputs());
                            }
                            inputs.set(operand.ordinalInParent, rel);
                            setChildRels(previous, inputs);
                        }
                }

                rels[operandOrdinal] = rel;
                matchRecurse(solve + 1);
            }
        }
    }
}

// End VolcanoRuleCall.java
